使用教程(时序模型)

本文以表格存储Tablestore中的宽表作为上游数据源为例介绍如何使用实时计算Flink写数据到Tablestore的时序表中。

背景信息

Tablestore的时序模型是针对时间序列数据的特点进行设计,适用于物联网设备监控、设备采集数据、机器监控数据等场景。更多信息,请参见时序模型介绍

Tablestore的时序模型中,采用一张二维的时序表来存储时序数据。

每行代表一个时间线在某个时间点的数据,该行的主键部分为时间线标识和时间戳,该行的数据列部分为该时间线在该时间戳下的数据点,可以有多个数据列。其中度量名称(measurement)、数据源(data source)和标签(tags)组成了一个时间线标识,时间戳(time)则标识具体的时间点。

注意事项

  • Flink中的每个TaskManager建议配置2 CPU4GB内存,此配置可以充分发挥每个TaskManager的计算能力。单个TaskManager能达到1万/s的写入速率。

  • source表分区数目足够多的情况下,建议Flink中并发配置在16以内,写入速率随并发线性增长。

  • FlinkTablestore实例必须处于同一专有网络VPC。Tablestore实例的服务地址必须使用VPC地址。

  • 当前支持使用此功能的地域有华东1(杭州)、华东2(上海)、华北2(北京)、华北3(张家口)、华北6(乌兰察布)、华南1(深圳)、中国香港、德国(法兰克福)、美国(弗吉尼亚)、新加坡。

Tablestore数据结果表

Flink支持使用Tablestore时序表存储输出结果。更多信息,请参见表格存储Tablestore连接器

时序模型主要有_m_name_data_source_tags_time四个主键,因此时序表作为结果表时需要指定四个主键,其余配置与数据表作为结果表时的配置相同。目前支持WITH参数,SINK表主键和Map格式主键三种方式指定时序表主键。三种方式_tags列的转换优先级为WITH参数方式的优先级最高,Map格式主键与SINK表主键方式次之。

WITH参数

使用WITH参数方式定义DDL的示例如下:

--创建源表的临时表。
CREATE TEMPORARY TABLE timeseries_source (
    measurement STRING,
    datasource STRING,
    tag_a STRING,
    `time` BIGINT,
    binary_value BINARY,
    bool_value BOOLEAN,
    double_value DOUBLE,
    long_value BIGINT,
    string_value STRING,
    tag_b STRING,
    tag_c STRING,
    tag_d STRING,
    tag_e STRING,
    tag_f STRING
) 
WITH (
    'connector' = 'ots',
    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'xxx',
    'tableName' = 'test_widecolume_source_table',
    'tunnelName' = 'test_widecolume_source_tunnel',
    'accessId' ='xxxxxxxxxxx',
    'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'ignoreDelete' = 'true', --是否忽略delete操作的数据。
);

--创建结果表的临时表。
CREATE TEMPORARY TABLE timeseries_sink (
    measurement STRING,
    datasource STRING,
    tag_a STRING,
    `time` BIGINT,
    binary_value BINARY,
    bool_value BOOLEAN,
    double_value DOUBLE,
    long_value BIGINT,
    string_value STRING,
    tag_b STRING,
    tag_c STRING,
    tag_d STRING,
    tag_e STRING,
    tag_f STRING,
    PRIMARY KEY(measurement, datasource, tag_a, `time`) NOT ENFORCED
) 
WITH (
    'connector' = 'ots',
    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'xxx',
    'tableName' = 'test_timeseries_sink_table',
    'accessId' ='xxxxxxxxxxx',
    'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'storageType' = 'TIMESERIES',
    'timeseriesSchema' = '{"measurement":"_m_name","datasource":"_data_source","tag_a":"_tags","tag_b":"_tags","tag_c":"_tags","tag_d":"_tags","tag_e":"_tags","tag_f":"_tags","time":"_time"}'
);

--将源表数据插入到结果表。
INSERT INTO timeseries_sink
    select 
    measurement,
    datasource,
    tag_a,
    `time`,
    binary_value,
    bool_value,
    double_value,
    long_value,
    string_value,
    tag_b,
    tag_c,
    tag_d,
    tag_e,
    tag_f
    from
        timeseries_source;

WITH参数说明请参见下表。

参数

适用模型

是否必填

说明

connector

通用参数

连接器类型。固定取值为ots。

endPoint

通用参数

Tablestore实例的服务地址,必须使用实例的VPC地址。更多信息,请参见服务地址

instanceName

通用参数

Tablestore实例的名称。

tableName

通用参数

Tablestore的数据表或者时序表名称。

数据表作为源表时填写数据表名称,时序表作为结果表时填写时序表名称。

tunnelName

宽表模型

Tablestore数据表的数据通道名称。关于创建通道的具体操作,请参见创建数据通道

accessId

通用参数

阿里云账号或者RAM用户的AccessKey(包括AccessKey IDAccessKey Secret)。关于获取AccessKey的具体操作,请参见创建AccessKey

accessKey

通用参数

ignoreDelete

宽表模型

是否忽略DELETE操作类型的实时数据,可选配置。默认值为false。数据表作为源表时可以根据需要配置。

storageType

通用参数

数据存储类型。取值范围如下:

  • WIDE_COLUMN(默认):数据表

    当数据表作为源表时,不配置此参数或者配置此参数为WIDE_COLUMN。

  • TIMESERIES:时序表

    当时序表作为结果表时,必须配置为TIMESERIES。

timeseriesSchema

时序模型

需要指定为时序表主键的列。以JSONkey-value格式来指定时序表主键,例如{"measurement":"_m_name","datasource":"_data_source","tag_a":"_tags","tag_b":"_tags","tag_c":"_tags","tag_d":"_tags","tag_e":"_tags","tag_f":"_tags","time":"_time"}

配置的主键类型必须与时序表中主键类型一致。其中tags主键可以支持同时包含多列。

SINK表主键

时序结果表的DDL定义示例如下所示。主键定义中的第一位measurement为_m_name列,第二位datasource为_data_source列,最后一位timetime列,中间的多列为tag列。

使用SINK表主键方式定义DDL的示例如下:

CREATE TEMPORARY TABLE timeseries_sink (
    measurement STRING,
    datasource STRING,
    tag_a STRING,
    `time` BIGINT,
    binary_value BINARY,
    bool_value BOOLEAN,
    double_value DOUBLE,
    long_value BIGINT,
    string_value STRING,
    tag_b STRING,
    tag_c STRING,
    tag_d STRING,
    tag_e STRING,
    tag_f STRING
    PRIMARY KEY(measurement, datasource, tag_a,tag_b,tag_c,tag_d,tag_e,tag_f `time`) NOT ENFORCED
) WITH (
    'connector' = 'ots',
    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'xxx',
    'tableName' = 'test_timeseries_sink_table',
    'accessId' ='xxxxxxxxxxx',
    'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'storageType' = 'TIMESERIES',
);

Map格式的主键

对于时序Sink表主键,Tablestore引入了FlinkMap类型便于生成时序模型中时序表的_tags列,Map类型可以支持列的改名、简单函数等映射操作。使用Map时必须保证其中的_tags主键声明位置在第三位。

--创建源表的临时表。
CREATE TEMPORARY TABLE timeseries_source (
    measurement STRING,
    datasource STRING,
    tag_a STRING,
    `time` BIGINT,
    binary_value BINARY,
    bool_value BOOLEAN,
    double_value DOUBLE,
    long_value BIGINT,
    string_value STRING,
    tag_b STRING,
    tag_c STRING,
    tag_d STRING,
    tag_e STRING,
    tag_f STRING
) 
WITH (
    'connector' = 'ots',
    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'xxx',
    'tableName' = 'test_widecolume_source_table',
    'tunnelName' = 'test_widecolume_source_tunnel',
    'accessId' ='xxxxxxxxxxx',
    'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'ignoreDelete' = 'true', --是否忽略delete操作的数据。
);
--创建结果表的临时表。
CREATE TEMPORARY TABLE timeseries_sink (
    measurement STRING,
    datasource STRING,
    tags Map<String, String>, 
    `time` BIGINT,
    binary_value BINARY,
    bool_value BOOLEAN,
    double_value DOUBLE,
    long_value BIGINT,
    string_value STRING,
    tag_b STRING,
    tag_c STRING,
    tag_d STRING,
    tag_e STRING,
    tag_f STRING,
    PRIMARY KEY(measurement, datasource, tags, `time`) NOT ENFORCED
) 
WITH (
    'connector' = 'ots',
    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'xxx',
    'tableName' = 'test_timeseries_sink_table',
    'accessId' ='xxxxxxxxxxx',
    'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'storageType' = 'TIMESERIES',
);

--将源表数据插入到结果表。
INSERT INTO timeseries_sink
    select 
        m_name,
        data_source,
        MAP[`tag_a`,`tag_b`,`tag_c`,`tag_d`,`tag_e`,`tag_f`] AS tags,
        `time`,
        cpu_sys,
        cpu_user,
        disk_0,
        disk_1,
        disk_2,
        memory_used,
        net_in,
        net_out 
    from
        timeseries_source;

实时计算作业开发流程

前提条件

  • 已创建AccessKey。具体操作,请参见创建AccessKey

  • 已为Tablestore数据表(源表)创建数据通道。具体操作,请参见创建数据通道

步骤一:创建作业

  1. 进入SQL作业创建页面。

    1. 登录实时计算控制台

    2. 单击目标工作空间操作列下的控制台

    3. 在左侧导航栏,单击数据开发 > ETL

  2. 单击新建后,在新建作业草稿对话框,选择空白的流作业草稿,单击下一步

    Flink也为您提供了丰富的代码模板和数据同步,每种代码模板都为您提供了具体的使用场景、代码示例和使用指导。您可以直接单击对应的模板快速地了解Flink产品功能和相关语法,实现您的业务逻辑,详情请参见代码模板数据同步模板

  3. 单击下一步

  4. 填写作业配置信息。

    作业参数

    说明

    示例

    文件名称

    作业的名称。

    说明

    作业名称在当前项目中必须保持唯一。

    flink-test

    存储位置

    指定该作业的代码文件所属的文件夹。

    您还可以在现有文件夹右侧,单击新建文件夹图标,新建子文件夹。

    作业草稿

    引擎版本

    当前作业使用的Flink的引擎版本。引擎版本号含义、版本对应关系和生命周期重要时间点详情请参见引擎版本介绍

    vvr-6.0.4-flink-1.15

  5. 单击创建

步骤二:编写作业代码

  1. 创建一个源表(Tablestore数据表)和结果表(Tablestore时序表)的临时表。

    说明

    在生产作业中,建议您尽量减少临时表的使用,直接使用元数据管理中已经注册的表。

    创建一个timeseries_sourcetimeseries_sink临时表代码示例如下:

    CREATE TEMPORARY TABLE timeseries_source (
        measurement STRING,
        datasource STRING,
        tag_a STRING,
        `time` BIGINT,
        binary_value BINARY,
        bool_value BOOLEAN,
        double_value DOUBLE,
        long_value BIGINT,
        string_value STRING,
        tag_b STRING,
        tag_c STRING,
        tag_d STRING,
        tag_e STRING,
        tag_f STRING
    ) 
    WITH (
        'connector' = 'ots',
        'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
        'instanceName' = 'xxx',
        'tableName' = 'test_widecolume_source_table',
        'tunnelName' = 'test_widecolume_source_tunnel',
        'accessId' = 'xxxxxxxxxxx',
        'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
        --是否忽略delete操作的数据。
        'ignoreDelete' = 'true', 
    );
    
    CREATE TEMPORARY TABLE timeseries_sink (
        measurement STRING,
        datasource STRING,
        tag_a STRING,
        `time` BIGINT,
        binary_value BINARY,
        bool_value BOOLEAN,
        double_value DOUBLE,
        long_value BIGINT,
        string_value STRING,
        tag_b STRING,
        tag_c STRING,
        tag_d STRING,
        tag_e STRING,
        tag_f STRING,
        PRIMARY KEY(measurement, datasource, tag_a, `time`) NOT ENFORCED
    ) 
    WITH (
        'connector' = 'ots',
        'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
        'instanceName' = 'xxx',
        'tableName' = 'test_timeseries_sink_table',
        'accessId' = 'xxxxxxxxxxx',
        'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
        'storageType' = 'TIMESERIES',
        'timeseriesSchema' = '{"measurement":"_m_name","datasource":"_data_source","tag_a":"_tags","tag_b":"_tags","tag_c":"_tags","tag_d":"_tags","tag_e":"_tags","tag_f":"_tags","time":"_time"}'
    );
  2. 编写作业逻辑。

    将源表数据插入到结果表的代码示例如下:

    INSERT INTO timeseries_sink
        select 
        measurement,
        datasource,
        tag_a,
        `time`,
        binary_value,
        bool_value,
        double_value,
        long_value,
        string_value,
        tag_b,
        tag_c,
        tag_d,
        tag_e,
        tag_f
        from
            timeseries_source;

步骤三:进行更多配置

SQL编辑区域右侧页签,您可以查看或修改相关配置。

页签名称

配置说明

更多配置

  • 引擎版本:引擎版本详情请参见引擎版本介绍生命周期策略。建议您使用推荐版本或稳定版本,引擎版本标记含义详情如下:

    • 推荐版本(Recommend):当前最新大版本下的最新小版本。

    • 稳定版本(Stable):还在产品服务期内的大版本下最新的小版本,已修复历史版本缺陷。

    • 普通版本(Normal):还在产品服务期内的其他小版本。

    • EOS版本(Deprecated):超过产品服务期限的版本。

  • 附加依赖文件:作业中需要使用到的附加依赖,例如临时函数等。

    如果没有VVR的权限,您可以下载VVR依赖,并在资源上传页面进行上传,然后选择附加依赖文件为上传的VVR依赖即可。具体操作,请参见附录:配置VVR依赖

代码结构

  • 数据流向图:您可以通过数据流向图快速查看出数据的流向。

  • 树状结构图:您可以通过树状结构图快速查看出数据的来源。

版本信息

您可以在此处查看作业版本信息,操作列下的功能详情请参见管理作业版本

步骤四:进行深度检查

SQL编辑区域顶部,单击深度检查,进行语法检查。

(可选)步骤五:进行作业调试

SQL编辑区域顶部,单击调试

您可以使用作业调试功能模拟作业运行、检查输出结果,验证SELECTINSERT业务逻辑的正确性,提升开发效率,降低数据质量风险,详情请参见作业调试

步骤六:作业部署

在作业开发页面顶部,单击部署,在部署新版本对话框,可根据需要填写或选中相关内容,单击确认部署

说明

Session集群适用于非生产环境的开发测试环境,您可以使用Session集群模式部署或调试作业,提高作业JM(Job Manager)资源利用率和提高作业启动速度。但不推荐您将作业提交至Session集群中,因为会存在业务稳定性问题。具体操作,请参见步骤一:创建Session集群

步骤七:启动并查看Flink计算结果

说明

如果您对作业进行了修改(例如更改SQL代码、增删改WITH参数、更改作业版本等),且希望修改生效,则需要先上线,然后停止再启动。另外,如果作业无法复用State,希望作业全新启动时,也需要停止后再启动作业。作业停止详情请参见作业停止

  1. 在左侧导航栏,单击作业运维

  2. 单击目标作业名称操作列中的启动

    作业启动参数配置详情请参见作业启动。单击启动后,您可以看到作业状态变为运行中,则代表作业运行正常。

  3. 在作业运维详情页面,查看Flink计算结果。

    1. 作业运维页面,单击目标作业名称。

    2. 单击作业探查

    3. 运行日志页签,单击运行Task Managers页签下的Path, ID

    4. 单击日志,在页面搜索Sink相关的日志信息。

附录:配置VVR依赖

  1. 下载VVR依赖

  2. 上传VVR依赖。

    1. 登录实时计算控制台

    2. 单击目标工作空间操作列下的控制台

    3. 在左侧导航栏,单击文件管理

    4. 资源文件页签,单击上传资源,选择要上传的VVR依赖的JAR包。

  3. 在目标作业的SQL编辑区域右侧页签,单击更多配置。在附加依赖文件项,选择目标VVR依赖的JAR包。